package server import ( "io" "sync" "iter" "bufio" "slices" "compress/gzip" "encoding/binary" "git.citrons.xyz/metronode/phony" ) type mapManager struct { phony.Inbox Blocks *blockVolume } func newMapManager(mapSize blockPos) *mapManager { m := &mapManager {Blocks: &blockVolume{}} m.Blocks.init(mapSize) return m } func (m *mapManager) GetBlock( from phony.Actor, p blockPos, reply func(blockType)) { m.Act(from, func() { m.Blocks.RLock() defer m.Blocks.RUnlock() block := m.Blocks.unsyncGetBlock(p) from.Act(nil, func() {reply(block)}) }) } func (m *mapManager) SetBlock(from phony.Actor, p blockPos, t blockType) { m.Act(from, func() { m.Blocks.Lock() defer m.Blocks.Unlock() m.Blocks.unsyncSetBlock(p, t) }) } type blockVolume struct { sync.RWMutex size blockPos blockRuns []blockData changes []blockData dirty bool } type blockData struct { Pos uint32 Block blockType } func (v *blockVolume) init(size blockPos) { *v = blockVolume { size: size, dirty: true, blockRuns: make([]blockData, 1, 1024), changes: make([]blockData, 0, 1024), } } func (v *blockVolume) flattenPos(p blockPos) uint32 { return uint32(flattenPos(p, v.size)) } func (v *blockVolume) unsyncGetBlock(p blockPos) blockType { flatPos := v.flattenPos(p) if len(v.changes) != 0 { i := findIndex(v.changes, flatPos) if v.changes[i].Pos == flatPos { return v.changes[i].Block } } i := findIndex(v.blockRuns, flatPos) return v.blockRuns[i].Block } func (v *blockVolume) unsyncSetBlock(p blockPos, t blockType) { if len(v.changes) >= 1024 { v.unsyncFlushChanges() } data := blockData {v.flattenPos(p), t} i := findIndex(v.changes, data.Pos) if len(v.changes) > 0 { if v.changes[i].Pos < data.Pos { i++ } else if v.changes[i].Pos == data.Pos { v.changes[i] = data return } } v.changes = slices.Insert(v.changes, i, data) } func (v *blockVolume) unsyncFlushChanges() { if len(v.changes) == 0 { return } var ( oldRuns = v.blockRuns changes = v.changes newRuns = make([]blockData, 0, len(oldRuns)) ) for i, data := range oldRuns { var nextPos uint32 if i < len(oldRuns) - 1 { nextPos = oldRuns[i + 1].Pos } else { nextPos = uint32(v.size.X * v.size.Y * v.size.Z) } if len(newRuns) == 0 || newRuns[len(newRuns)-1].Block != data.Block { newRuns = append(newRuns, data) } for len(changes) > 0 { ch := changes[0] if ch.Pos >= nextPos { break } changes = changes[1:] prev := newRuns[len(newRuns)-1] if prev.Block == ch.Block { continue } if prev.Pos == ch.Pos { newRuns[len(newRuns)-1] = ch } newRuns = append(newRuns, ch) if ch.Pos + 1 < nextPos { newRuns = append(newRuns, blockData {ch.Pos + 1, prev.Block}) } } } v.blockRuns = newRuns v.changes = v.changes[:0] } func (v *blockVolume) unsyncGetAll() iter.Seq[blockType] { return func(yield func(blockType) bool) { var (runs = v.blockRuns; changes = v.changes) for i, data := range runs { var nextPos uint32 if i < len(runs) - 1 { nextPos = runs[i + 1].Pos } else { nextPos = uint32(v.size.X * v.size.Y * v.size.Z) } runLoop: for p := data.Pos; p < nextPos; p++ { for len(changes) != 0 { ch := changes[0] if ch.Pos > p { break } changes = changes[1:] if ch.Pos == p { if !yield(ch.Block) { return } continue runLoop } } if !yield(data.Block) { return } } } } } func (v *blockVolume) lockAndSetAll(blocks iter.Seq[blockType]) { var ( block blockType pos uint32 newRuns = make([]blockData, 0, 1024) ) for b := range blocks { if len(newRuns) == 0 || b != block { block = b newRuns = append(newRuns, blockData {pos, block}) } pos++ } newRuns = append(newRuns, blockData {pos, 0}) v.Lock() v.blockRuns = newRuns v.changes = v.changes[:0] } func (v *blockVolume) syncSetAll(blocks iter.Seq[blockType]) { defer v.Unlock() v.lockAndSetAll(blocks) } /* func (v *blockVolume) unsyncBlocksInRange(min, max blockPos) []blockType { // todo } */ func (v *blockVolume) syncCompressForNetwork(/*highBits bool*/) io.ReadCloser { rd, wr := io.Pipe() go func() { v.RLock() defer v.RUnlock() bw := bufio.NewWriter(wr) defer bw.Flush() writePointlessGzipHeader(bw) len := uint32(v.size.X*v.size.Y*v.size.Z) deflateRuns(wr, v.blockRuns, v.changes, len) writePointlessGzipTrailer(bw, len) }() return rd } func (v *blockVolume) syncCompressForStorage() io.ReadCloser { rd, wr := io.Pipe() go func() { defer wr.Close() z := gzip.NewWriter(wr) defer z.Close() bw := bufio.NewWriter(z) defer bw.Flush() v.RLock() defer v.RUnlock() writeBlockData(bw, v.blockRuns) writeBlockData(bw, v.changes) }() return rd } func (v *blockVolume) syncDecompressFromStorage(rd io.Reader) (err error) { var z *gzip.Reader z, err = gzip.NewReader(rd) if err != nil { return } var blockRuns, changes []blockData blockRuns, err = readBlockData(z) if err != nil { return } changes, err = readBlockData(z) if err != nil { return } v.Lock() defer v.Unlock() v.blockRuns = blockRuns v.changes = changes return } func findIndex(data []blockData, flatPos uint32) int { var (lo = 0; hi = len(data)) for lo + 1 < hi { p := (lo + hi) >> 1 x := data[p].Pos if (x > flatPos) { hi = p } else if (x < flatPos) { lo = p } else { return p } } return lo } func writeBlockData(wr io.Writer, data []blockData) error { err := binary.Write(wr, binary.LittleEndian, int64(len(data))) if err != nil { return err } for _, d := range data { err = binary.Write(wr, binary.LittleEndian, d) if err != nil { return err } } return nil } func readBlockData(rd io.Reader) ([]blockData, error) { var (len int64; data []blockData) err := binary.Read(rd, binary.LittleEndian, &len) if err != nil { return nil, err } data = make([]blockData, 0, len) var i int64 for i = 0; i < len; i++ { var d blockData err = binary.Read(rd, binary.LittleEndian, &d) if err != nil { return nil, err } data = append(data, d) } return data, nil }